package com.opencee.cloud.msg.utils;

import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSONObject;
import com.opencee.cloud.msg.api.constatns.MsgConstants;
import com.opencee.cloud.msg.api.vo.DelayedMessage;
import com.opencee.cloud.msg.api.vo.params.DelayedParams;
import com.opencee.cloud.msg.api.vo.params.TopicMessage;
import com.opencee.cloud.msg.api.vo.params.TopicParams;
import org.springframework.amqp.core.*;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 延迟消息工具类
 *
 * @author yadu
 */
public class MqUtil {

    /**
     * 构建延迟队列交换机
     * 延时队列交换机
     * 注意这里的交换机类型：CustomExchange
     * 创建exchange时指定exchange_type为x-delayed-message
     * 添加参数，这里指定exchange类型arguments={"x-delayed-type": "fanout"}
     * 添加消息到队列时添加
     * headers={'x-delay': 8000}
     *
     * @return
     */
    public static CustomExchange buildDelayExchange() {
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(MsgConstants.DELAY_EXCHANGE, "x-delayed-message", true, false, args);
    }

    /**
     * 构建广播交换机
     *
     * @return
     */
    public static Exchange buildTopicExchange() {
        //声明了一个Topic类型的交换机，durable是持久化（重启rabbitmq这个交换机不会被自动删除）
        return ExchangeBuilder.topicExchange(MsgConstants.TOPIC_EXCHANGE).durable(true).build();
    }

    /**
     * 绑定延迟队列
     *
     * @param queue         名
     * @param routeKey      路由key
     * @param delayExchange 延迟交换机
     * @return
     */
    public static Binding bindingDelayedQueue(Queue queue, String routeKey, Exchange delayExchange) {
        return BindingBuilder.bind(queue).to(delayExchange).with(routeKey).noargs();
    }


    /**
     * 发送延迟消息
     *
     * @param amqpTemplate
     * @param message
     * @throws Exception
     */
    public synchronized static boolean delayed(AmqpTemplate amqpTemplate, DelayedMessage message) throws Exception {
        DelayedParams delayedSendParams = new DelayedParams();
        delayedSendParams.add(message);
        return delayed(amqpTemplate, delayedSendParams);
    }

    /**
     * 发送延迟消息
     *
     * @param delayedSendParams 路由KEY
     */
    public synchronized static boolean delayed(AmqpTemplate amqpTemplate, DelayedParams delayedSendParams) throws Exception {
        if (amqpTemplate == null) {
            throw new IllegalArgumentException("amqpTemplate不能为空");
        }
        AtomicInteger count = new AtomicInteger();
        Assert.notEmpty(delayedSendParams.getDelayedMessageList(), "消息列表不能为空");
        delayedSendParams.getDelayedMessageList().forEach(m -> {
            if (m.getTimes() > MsgConstants.MAX_DELAY) {
                throw new IllegalArgumentException("延迟时间最大不能超过15天");
            }
            if (m.getTimes() < 0) {
                m.setTimes(0);
            }
            String delay = String.valueOf(m.getTimes());
            amqpTemplate.convertAndSend(MsgConstants.DELAY_EXCHANGE, m.getRouteKey(), JSONObject.toJSONString(m), message -> {
                String messageId = m.getMsgId();
                if (StringUtils.isEmpty(messageId)) {
                    messageId = IdUtil.simpleUUID();
                }
                message.getMessageProperties().setMessageId(messageId);
                message.getMessageProperties().setTimestamp(new Date());
                message.getMessageProperties().setType("x-delayed-message");
                //添加消息到队列时添加 headers={'x-delay': 8000}
                message.getMessageProperties().setDelay(Integer.parseInt(delay));
                // x-delay 这个版本请求头查询不到, 自定义了一个delay-times 来查询延迟时间
                message.getMessageProperties().setHeader("delay-times", delay);
                return message;
            });
        });
        return count.get() > 0;
    }


    /**
     * 发送广播消息
     *
     * @param amqpTemplate
     * @param topicMessage
     * @return
     */
    public synchronized static boolean topic(AmqpTemplate amqpTemplate, TopicMessage topicMessage) {
        if (amqpTemplate == null) {
            throw new IllegalArgumentException("amqpTemplate不能为空");
        }
        TopicParams topicMessageParams = new TopicParams();
        topicMessageParams.add(topicMessage);
        return topic(amqpTemplate, topicMessageParams);
    }

    /**
     * 批量发送广播消息
     *
     * @param amqpTemplate
     * @param topicMessageParams
     * @return
     */
    public synchronized static boolean topic(AmqpTemplate amqpTemplate, TopicParams topicMessageParams) {
        if (amqpTemplate == null) {
            throw new IllegalArgumentException("amqpTemplate不能为空");
        }
        AtomicInteger count = new AtomicInteger();
        if (topicMessageParams.getTopicMessageList() == null || topicMessageParams.getTopicMessageList().isEmpty()) {
            return false;
        }
        topicMessageParams.getTopicMessageList().forEach(m -> {
            if (m.getRouteKey() != null && m.getRouteKey().startsWith(MsgConstants.TOPIC_QUEUE_PREFIX)) {
                amqpTemplate.convertAndSend(MsgConstants.TOPIC_EXCHANGE, m.getRouteKey(), JSONObject.toJSONString(m));
                count.incrementAndGet();
            }
        });
        return count.get() > 0;
    }
}
